// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "ipc/mojo/ipc_message_pipe_reader.h"

#include <stdint.h>
#include <utility>

#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/single_thread_task_runner.h"
#include "base/thread_task_runner_handle.h"
#include "ipc/mojo/async_handle_waiter.h"
#include "ipc/mojo/ipc_channel_mojo.h"

namespace IPC {
namespace internal {

    MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle,
        MessagePipeReader::Delegate* delegate)
        : pipe_(std::move(handle))
        , handle_copy_(pipe_.get().value())
        , delegate_(delegate)
        , async_waiter_(new AsyncHandleWaiter(
              base::Bind(&MessagePipeReader::PipeIsReady, base::Unretained(this))))
        , pending_send_error_(MOJO_RESULT_OK)
    {
    }

    MessagePipeReader::~MessagePipeReader()
    {
        DCHECK(thread_checker_.CalledOnValidThread());
        // The pipe should be closed before deletion.
        CHECK(!IsValid());
    }

    void MessagePipeReader::Close()
    {
        DCHECK(thread_checker_.CalledOnValidThread());
        async_waiter_.reset();
        pipe_.reset();
        OnPipeClosed();
    }

    void MessagePipeReader::CloseWithError(MojoResult error)
    {
        DCHECK(thread_checker_.CalledOnValidThread());
        OnPipeError(error);
        Close();
    }

    void MessagePipeReader::CloseWithErrorIfPending()
    {
        DCHECK(thread_checker_.CalledOnValidThread());
        MojoResult pending_error = base::subtle::NoBarrier_Load(&pending_send_error_);
        if (pending_error == MOJO_RESULT_OK)
            return;
        // NOTE: This races with Send(), and therefore the value of
        // pending_send_error() can change.
        CloseWithError(pending_error);
        return;
    }

    void MessagePipeReader::CloseWithErrorLater(MojoResult error)
    {
        DCHECK_NE(error, MOJO_RESULT_OK);
        // NOTE: No assumptions about the value of |pending_send_error_| or whether or
        // not the error has been signaled can be made. If Send() is called
        // immediately before Close() and errors, it's possible for the error to not
        // be signaled.
        base::subtle::NoBarrier_Store(&pending_send_error_, error);
    }

    bool MessagePipeReader::Send(scoped_ptr<Message> message)
    {
        TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
            "MessagePipeReader::Send",
            message->flags(),
            TRACE_EVENT_FLAG_FLOW_OUT);
        std::vector<MojoHandle> handles;
        MojoResult result = MOJO_RESULT_OK;
        result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles);
        if (result == MOJO_RESULT_OK) {
            result = MojoWriteMessage(handle(),
                message->data(),
                static_cast<uint32_t>(message->size()),
                handles.empty() ? nullptr : &handles[0],
                static_cast<uint32_t>(handles.size()),
                MOJO_WRITE_MESSAGE_FLAG_NONE);
        }

        if (result != MOJO_RESULT_OK) {
            std::for_each(handles.begin(), handles.end(), &MojoClose);
            // We cannot call CloseWithError() here as Send() is protected by
            // ChannelMojo's lock and CloseWithError() could re-enter ChannelMojo. We
            // cannot call CloseWithError() also because Send() can be called from
            // non-UI thread while OnPipeError() expects to be called on IO thread.
            CloseWithErrorLater(result);
            return false;
        }

        return true;
    }

    void MessagePipeReader::OnMessageReceived()
    {
        Message message(data_buffer().empty() ? "" : &data_buffer()[0],
            static_cast<uint32_t>(data_buffer().size()));

        std::vector<MojoHandle> handle_buffer;
        TakeHandleBuffer(&handle_buffer);
        MojoResult write_result = ChannelMojo::WriteToMessageAttachmentSet(handle_buffer, &message);
        if (write_result != MOJO_RESULT_OK) {
            CloseWithError(write_result);
            return;
        }

        TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
            "MessagePipeReader::OnMessageReceived",
            message.flags(),
            TRACE_EVENT_FLAG_FLOW_IN);
        delegate_->OnMessageReceived(message);
    }

    void MessagePipeReader::OnPipeClosed()
    {
        DCHECK(thread_checker_.CalledOnValidThread());
        if (!delegate_)
            return;
        delegate_->OnPipeClosed(this);
        delegate_ = nullptr;
    }

    void MessagePipeReader::OnPipeError(MojoResult error)
    {
        DCHECK(thread_checker_.CalledOnValidThread());
        if (!delegate_)
            return;
        delegate_->OnPipeError(this);
    }

    MojoResult MessagePipeReader::ReadMessageBytes()
    {
        DCHECK(thread_checker_.CalledOnValidThread());
        DCHECK(handle_buffer_.empty());

        uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
        uint32_t num_handles = 0;
        MojoResult result = MojoReadMessage(pipe_.get().value(),
            num_bytes ? &data_buffer_[0] : nullptr,
            &num_bytes,
            nullptr,
            &num_handles,
            MOJO_READ_MESSAGE_FLAG_NONE);
        data_buffer_.resize(num_bytes);
        handle_buffer_.resize(num_handles);
        if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
            // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
            // it needs more bufer. So we re-read it with resized buffers.
            result = MojoReadMessage(pipe_.get().value(),
                num_bytes ? &data_buffer_[0] : nullptr,
                &num_bytes,
                num_handles ? &handle_buffer_[0] : nullptr,
                &num_handles,
                MOJO_READ_MESSAGE_FLAG_NONE);
        }

        DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
        DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
        return result;
    }

    void MessagePipeReader::ReadAvailableMessages()
    {
        DCHECK(thread_checker_.CalledOnValidThread());
        while (pipe_.is_valid()) {
            MojoResult read_result = ReadMessageBytes();
            if (read_result == MOJO_RESULT_SHOULD_WAIT)
                break;
            if (read_result != MOJO_RESULT_OK) {
                DLOG(WARNING)
                    << "Pipe got error from ReadMessage(). Closing: " << read_result;
                OnPipeError(read_result);
                Close();
                break;
            }

            OnMessageReceived();
        }
    }

    void MessagePipeReader::ReadMessagesThenWait()
    {
        DCHECK(thread_checker_.CalledOnValidThread());
        while (true) {
            ReadAvailableMessages();
            if (!pipe_.is_valid())
                break;
            // |Wait()| is safe to call only after all messages are read.
            // If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise.
            // Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
            // MessagePipe.
            MojoResult result = async_waiter_->Wait(pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE);
            // If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages
            // that have been arrived after the last |ReadAvailableMessages()|.
            // We have to consume then and retry in that case.
            if (result != MOJO_RESULT_ALREADY_EXISTS) {
                if (result != MOJO_RESULT_OK) {
                    LOG(ERROR) << "Failed to wait on the pipe. Result is " << result;
                    OnPipeError(result);
                    Close();
                }

                break;
            }
        }
    }

    void MessagePipeReader::PipeIsReady(MojoResult wait_result)
    {
        DCHECK(thread_checker_.CalledOnValidThread());
        CloseWithErrorIfPending();
        if (!IsValid()) {
            // There was a pending error and it closed the pipe.
            // We cannot do the work anymore.
            return;
        }

        if (wait_result != MOJO_RESULT_OK) {
            if (wait_result != MOJO_RESULT_ABORTED) {
                // FAILED_PRECONDITION happens every time the peer is dead so
                // it isn't worth polluting the log message.
                LOG_IF(WARNING, wait_result != MOJO_RESULT_FAILED_PRECONDITION)
                    << "Pipe got error from the waiter. Closing: " << wait_result;
                OnPipeError(wait_result);
            }

            Close();
            return;
        }

        ReadMessagesThenWait();
    }

    void MessagePipeReader::DelayedDeleter::operator()(
        MessagePipeReader* ptr) const
    {
        ptr->Close();
        base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE,
            base::Bind(&DeleteNow, ptr));
    }

} // namespace internal
} // namespace IPC
